use std::{ops::Deref, sync::Arc};

use qbase::{
    Epoch, GetEpoch,
    error::{Error, QuicError},
    frame::{ConnectionCloseFrame, CryptoFrame, Frame as V1Frame},
    net::tx::Signals,
    packet::{
        header::{GetScid, long::InitialHeader},
        io::PacketSpace,
        keys::{ArcKeys, Keys},
    },
    token::TokenRegistry,
    util::BoundQueue,
};
use qcongestion::{Feedback, Transport};
use qevent::{
    quic::{
        PacketHeader, PacketType, QuicFramesCollector,
        recovery::{PacketLost, PacketLostTrigger},
    },
    telemetry::Instrument,
};
use qinterface::{
    packet::{CipherPacket, PlainPacket},
    route::Way,
};
use qrecovery::crypto::CryptoStream;
use tokio::sync::mpsc;

use crate::{
    Components, InitialJournal,
    events::{ArcEventBroker, EmitEvent, Event},
    path::{self, Path, error::CreatePathFailure},
    space::{
        AckInitialSpace, Frame, assemble_closing_packet, filter_odcid_packet, pipe,
        read_plain_packet,
    },
    state,
    termination::Terminator,
    tx::{PacketWriter, TrivialPacketWriter},
};

pub type CipherInitialPacket = CipherPacket<InitialHeader>;
pub type PlainInitialPacket = PlainPacket<InitialHeader>;
pub type ReceivedFrom = (CipherInitialPacket, Way);

pub struct InitialSpace {
    keys: ArcKeys,
    journal: InitialJournal,
}

impl AsRef<InitialJournal> for InitialSpace {
    fn as_ref(&self) -> &InitialJournal {
        &self.journal
    }
}

impl InitialSpace {
    // Initial keys应该是预先知道的，或者传入dcid，可以构造出来
    pub fn new(keys: Keys) -> Self {
        let journal = InitialJournal::with_capacity(16, None);
        Self {
            keys: ArcKeys::with_keys(keys),
            journal,
        }
    }

    pub fn keys(&self) -> ArcKeys {
        self.keys.clone()
    }

    pub async fn decrypt_packet(
        &self,
        packet: CipherInitialPacket,
    ) -> Option<Result<PlainInitialPacket, QuicError>> {
        match self.keys.get_remote_keys().await {
            Some(keys) => packet.decrypt_long_packet(
                keys.remote.header.as_ref(),
                keys.remote.packet.as_ref(),
                |pn| self.journal.of_rcvd_packets().decode_pn(pn),
            ),
            None => {
                packet.drop_on_key_unavailable();
                None
            }
        }
    }

    pub fn tracker(&self, crypto_stream: CryptoStream) -> InitialTracker {
        InitialTracker {
            journal: self.journal.clone(),
            crypto_stream,
        }
    }
}

impl GetEpoch for InitialSpace {
    fn epoch(&self) -> Epoch {
        Epoch::Initial
    }
}

impl path::PacketSpace<InitialHeader> for InitialSpace {
    type JournalFrame = CryptoFrame;

    fn new_packet<'b, 's>(
        &'s self,
        header: InitialHeader,
        cc: &qcongestion::ArcCC,
        buffer: &'b mut [u8],
    ) -> Result<PacketWriter<'b, 's, CryptoFrame>, Signals> {
        let keys = self.keys.get_local_keys().ok_or(Signals::KEYS)?;
        let (retran_timeout, expire_timeout) = cc.retransmit_and_expire_time(Epoch::Handshake);
        PacketWriter::new_long(
            header,
            buffer,
            keys.local,
            self.journal.as_ref(),
            retran_timeout,
            expire_timeout,
        )
    }
}

impl PacketSpace<InitialHeader> for InitialSpace {
    type PacketAssembler<'a> = TrivialPacketWriter<'a, 'a, CryptoFrame>;

    #[inline]
    fn new_packet<'a>(
        &'a self,
        header: InitialHeader,
        buffer: &'a mut [u8],
    ) -> Result<Self::PacketAssembler<'a>, Signals> {
        let keys = self.keys.get_local_keys().ok_or(Signals::KEYS)?;
        TrivialPacketWriter::new_long(header, buffer, keys.local, self.journal.as_ref())
    }
}

fn frame_dispathcer(
    space: &InitialSpace,
    components: &Components,
    event_broker: &ArcEventBroker,
) -> impl for<'p> Fn(Frame, &'p Path) {
    let (crypto_frames_entry, rcvd_crypto_frames) = mpsc::unbounded_channel();
    let (ack_frames_entry, rcvd_ack_frames) = mpsc::unbounded_channel();

    pipe(
        rcvd_crypto_frames,
        components.crypto_streams[space.epoch()].incoming(),
        event_broker.clone(),
    );
    pipe(
        rcvd_ack_frames,
        AckInitialSpace::new(&space.journal, &components.crypto_streams[space.epoch()]),
        event_broker.clone(),
    );

    let event_broker = event_broker.clone();
    let rcvd_joural = space.journal.of_rcvd_packets();
    let dispatch_v1_frame = move |frame: V1Frame, path: &Path| match frame {
        V1Frame::Ack(f) => {
            path.cc().on_ack_rcvd(Epoch::Initial, &f);
            rcvd_joural.on_rcvd_ack(&f);
            _ = ack_frames_entry.send(f);
        }
        V1Frame::Close(f) => event_broker.emit(Event::Closed(f)),
        V1Frame::Crypto(f, bytes) => _ = crypto_frames_entry.send((f, bytes)),
        V1Frame::Padding(_) | V1Frame::Ping(_) => {}
        _ => unreachable!("unexpected frame: {:?} in handshake packet", frame),
    };
    move |frame, path| match frame {
        Frame::V1(frame) => dispatch_v1_frame(frame, path),
    }
}

async fn parse_normal_packet(
    (packet, (bind_uri, pathway, link)): ReceivedFrom,
    space: &InitialSpace,
    components: &Components,
    dispatch_frame: impl Fn(Frame, &Path),
) -> Result<(), Error> {
    let parameters = &components.parameters;
    let paths = &components.paths;
    let remote_cids = &components.cid_registry.remote;

    let validate_token = {
        let token_registry = &components.token_registry;
        let tls_handshake = &components.tls_handshake;
        |initial_token: &[u8], path: &Path| {
            if let TokenRegistry::Server(provider) = token_registry.deref() {
                if let Ok(Some(server_name)) = tls_handshake.server_name() {
                    if provider.verify_token(server_name, initial_token) {
                        path.grant_anti_amplification();
                    }
                }
            }
        }
    };

    // rfc9000 7.2:
    // if subsequent Initial packets include a different Source Connection ID, they MUST be discarded. This avoids
    // unpredictable outcomes that might otherwise result from stateless processing of multiple Initial packets
    // with different Source Connection IDs.
    if matches!(parameters.lock_guard()?.initial_scid_from_peer(), Some(scid) if scid != *packet.scid())
    {
        packet.drop_on_scid_unmatch();
        return Ok(());
    }

    let Some(packet) = space.decrypt_packet(packet).await.transpose()? else {
        return Ok(());
    };

    let path = match components.get_or_try_create_path(bind_uri, link, pathway, true) {
        Ok(path) => path,
        Err(CreatePathFailure::ConnectionClosed(..)) => {
            packet.drop_on_conenction_closed();
            return Ok(());
        }
        Err(CreatePathFailure::NoInterface(..)) => {
            packet.drop_on_interface_not_found();
            return Ok(());
        }
    };

    let Some(packet) = filter_odcid_packet(packet, &components.specific) else {
        return Ok(());
    };

    let packet_contains = read_plain_packet(&packet, |frame| dispatch_frame(frame, &path))?;

    space.journal.of_rcvd_packets().on_rcvd_pn(
        packet.pn(),
        packet_contains.ack_eliciting(),
        path.cc().get_pto(Epoch::Initial),
    );
    path.on_packet_rcvd(Epoch::Initial, packet.pn(), packet.size(), packet_contains);

    // Negotiate handshake path
    if paths.assign_handshake_path(&path, remote_cids, *packet.scid()) {
        parameters
            .lock_guard()?
            .initial_scid_from_peer_need_equal(*packet.scid())?;
    }

    // See [RFC 9000 section 8.1](https://www.rfc-editor.org/rfc/rfc9000.html#name-address-validation-during-c)
    // A server might wish to validate the client address before starting the cryptographic handshake.
    // QUIC uses a token in the Initial packet to provide address validation prior to completing the handshake.
    // This token is delivered to the client during connection establishment with a Retry packet (see Section 8.1.2)
    // or in a previous connection using the NEW_TOKEN frame (see Section 8.1.3).
    if !packet.token().is_empty() {
        validate_token(packet.token(), &path);
    }
    Result::<(), Error>::Ok(())
}

fn parse_closing_packet(
    space: &InitialSpace,
    packet: CipherInitialPacket,
) -> Option<ConnectionCloseFrame> {
    // TOOD: improve Keys
    let remote_keys = space.keys.get_local_keys()?.remote;
    let packet = packet
        .decrypt_long_packet(
            remote_keys.header.as_ref(),
            remote_keys.packet.as_ref(),
            |pn| space.journal.of_rcvd_packets().decode_pn(pn),
        )
        .and_then(Result::ok)?;

    let mut ccf = None;
    _ = read_plain_packet(&packet, |frame| {
        ccf = ccf.take().or(match frame {
            Frame::V1(V1Frame::Close(ccf)) => Some(ccf),
            _ => None,
        })
    });
    ccf
}

pub async fn deliver_and_parse_packets(
    packets: BoundQueue<ReceivedFrom>,
    space: Arc<InitialSpace>,
    components: Components,
    event_broker: ArcEventBroker,
) {
    let conn_state = &components.conn_state;
    let dispatch_frame = frame_dispathcer(&space, &components, &event_broker);
    let normal_deliver_and_parse_loop = async {
        while let Some(form) = packets.recv().await {
            let span = qevent::span!(@current, path=form.1.2.to_string());
            let parse = parse_normal_packet(form, &space, &components, &dispatch_frame);
            if let Err(Error::Quic(error)) = Instrument::instrument(parse, span).await {
                event_broker.emit(Event::Failed(error));
            };
        }
    };

    let ccf = tokio::select! {
        // deliver and parse packets. complete when packet queue closed
        _ = normal_deliver_and_parse_loop => return,
        // connection terminated(enter closing/draining state)
        error = conn_state.terminated() => match conn_state.current() {
            // entered closing_state, keep receiving packets, and send ccf
            state if state == Some(state::CLOSING) => ConnectionCloseFrame::from(error),
            // entered other state, do nothing
            _ => return
        }
    };

    let terminator = Terminator::new(ccf, &components);
    // Release the primary connection state
    drop(components);

    while let Some((packet, (_bind_uri, pathway, _link))) = packets.recv().await {
        if let Some(ccf) = parse_closing_packet(&space, packet) {
            event_broker.emit(Event::Closed(ccf));
        }

        // TODO：尝试解决计数分离的问题？将收包统计转为连接和路径级？发送数据包交给路径？
        if terminator.should_send() {
            terminator
                .try_send_on(pathway, |buffer, ccf| {
                    assemble_closing_packet(space.as_ref(), &terminator, buffer, ccf)
                })
                .await
        }
    }
}

pub struct InitialTracker {
    journal: InitialJournal,
    crypto_stream: CryptoStream,
}

impl Feedback for InitialTracker {
    fn may_loss(&self, trigger: PacketLostTrigger, pns: &mut dyn Iterator<Item = u64>) {
        let sent_jornal = self.journal.of_sent_packets();
        let outgoing = self.crypto_stream.outgoing();
        let mut sent_packets = sent_jornal.rotate();
        for pn in pns {
            let mut may_lost_frames = QuicFramesCollector::<PacketLost>::new();
            for frame in sent_packets.may_loss_packet(pn) {
                may_lost_frames.extend([&frame]);
                outgoing.may_loss_data(&frame);
            }
            qevent::event!(PacketLost {
                header: PacketHeader {
                    packet_type: PacketType::Initial,
                    packet_number: pn
                },
                frames: may_lost_frames,
                trigger
            });
        }
    }
}
